package com.flink_demo.demo.kafka;

import java.util.Properties;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;

public class KafkaWordCount {
	public static final String ZOOKEEPER_HOST = "192.168.2.200:2181";
	public static final String KAFKA_BROKER = "192.168.2.200:9092";
	public static final String TRANSACTION_GROUP = "KafkaStream";

	public static void main(String... args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
		env.enableCheckpointing(1000);
		env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

		// configure Kafka consumer
		Properties kafkaProps = new Properties();
		kafkaProps.setProperty("zookeeper.connect", ZOOKEEPER_HOST);
		kafkaProps.setProperty("bootstrap.servers", KAFKA_BROKER);
		kafkaProps.setProperty("group.id", TRANSACTION_GROUP);
		System.out.println(env.getStreamTimeCharacteristic());

		// topicd的名字是new，schema默认使用SimpleStringSchema()即可
		DataStreamSource<String> transaction = env
				.addSource(new FlinkKafkaConsumer010<String>("test-topic1", new SimpleStringSchema(), kafkaProps));
		transaction.map(new MapFunction<String, Integer>() {

			private static final long serialVersionUID = 1L;

			@Override
			public Integer map(String val) throws Exception {
				// TODO Auto-generated method stub
				return Integer.valueOf(val);
			}
		}).flatMap(new FlatMapFunction<Integer, Tuple2<String, Integer>>() {

			private static final long serialVersionUID = 1L;

			@Override
			public void flatMap(Integer value, Collector<Tuple2<String, Integer>> out) throws Exception {
				out.collect(new Tuple2<String, Integer>("sum", value));
//				Word word = new Word();
//				word.word = "sum";
//				word.value = value;
//				out.collect(word);
			}
		})
//		.keyBy("word").sum("value").print();
		.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))).sum(1).print();

		env.execute();
	}
	
	public static class Word {
		public String word;
		public int value;
	}
}
